// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

use std::process::Command;

use rstest::rstest;

use async_trait::async_trait;
use insta::{glob, Settings};
use insta_cmd::{assert_cmd_snapshot, get_cargo_bin};
use std::path::PathBuf;
use std::{env, fs};
use testcontainers::core::{CmdWaitFor, ExecCommand, Mount};
use testcontainers::runners::AsyncRunner;
use testcontainers::{ContainerAsync, ImageExt, TestcontainersError};
use testcontainers_modules::minio;

fn cli() -> Command {
    Command::new(get_cargo_bin("datafusion-cli"))
}

fn make_settings() -> Settings {
    let mut settings = Settings::clone_current();
    settings.set_prepend_module_to_snapshot(false);
    settings.add_filter(r"Elapsed .* seconds\.", "[ELAPSED]");
    settings.add_filter(r"DataFusion CLI v.*", "[CLI_VERSION]");
    settings.add_filter(r"(?s)backtrace:.*?\n\n\n", "");
    settings
}

async fn setup_minio_container() -> ContainerAsync<minio::MinIO> {
    const MINIO_ROOT_USER: &str = "TEST-DataFusionLogin";
    const MINIO_ROOT_PASSWORD: &str = "TEST-DataFusionPassword";

    let data_path =
        PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("../datafusion/core/tests/data");

    let absolute_data_path = data_path
        .canonicalize()
        .expect("Failed to get absolute path for test data");

    let container = minio::MinIO::default()
        .with_env_var("MINIO_ROOT_USER", MINIO_ROOT_USER)
        .with_env_var("MINIO_ROOT_PASSWORD", MINIO_ROOT_PASSWORD)
        .with_mount(Mount::bind_mount(
            absolute_data_path.to_str().unwrap(),
            "/source",
        ))
        .start()
        .await;

    match container {
        Ok(container) => {
            // We wait for MinIO to be healthy and prepare test files. We do it via CLI to avoid s3 dependency
            let commands = [
                ExecCommand::new(["/usr/bin/mc", "ready", "local"]),
                ExecCommand::new([
                    "/usr/bin/mc",
                    "alias",
                    "set",
                    "localminio",
                    "http://localhost:9000",
                    MINIO_ROOT_USER,
                    MINIO_ROOT_PASSWORD,
                ]),
                ExecCommand::new(["/usr/bin/mc", "mb", "localminio/data"]),
                ExecCommand::new([
                    "/usr/bin/mc",
                    "cp",
                    "-r",
                    "/source/",
                    "localminio/data/",
                ]),
            ];

            for command in commands {
                let command =
                    command.with_cmd_ready_condition(CmdWaitFor::Exit { code: Some(0) });

                let cmd_ref = format!("{command:?}");

                if let Err(e) = container.exec(command).await {
                    let stdout = container.stdout_to_vec().await.unwrap_or_default();
                    let stderr = container.stderr_to_vec().await.unwrap_or_default();

                    panic!(
                        "Failed to execute command: {}\nError: {}\nStdout: {:?}\nStderr: {:?}",
                        cmd_ref,
                        e,
                        String::from_utf8_lossy(&stdout),
                        String::from_utf8_lossy(&stderr)
                    );
                }
            }

            container
        }

        Err(TestcontainersError::Client(e)) => {
            panic!("Failed to start MinIO container. Ensure Docker is running and accessible: {e}");
        }
        Err(e) => {
            panic!("Failed to start MinIO container: {e}");
        }
    }
}

#[cfg(test)]
#[ctor::ctor]
fn init() {
    // Enable RUST_LOG logging configuration for tests
    let _ = env_logger::try_init();
}

#[rstest]
#[case::exec_multiple_statements(
    "statements",
    ["--command", "select 1; select 2;", "-q"],
)]
#[case::exec_backslash(
    "backslash",
    ["--file", "tests/sql/backslash.sql", "--format", "json", "-q"],
)]
#[case::exec_from_files(
    "files",
    ["--file", "tests/sql/select.sql", "-q"],
)]
#[case::set_batch_size(
    "batch_size",
    ["--command", "show datafusion.execution.batch_size", "-q", "-b", "1"],
)]
#[case::default_explain_plan(
    "default_explain_plan",
    // default explain format should be tree
    ["--command", "EXPLAIN SELECT 123"],
)]
#[case::can_see_indent_format(
    "can_see_indent_format",
    // can choose the old explain format too
    ["--command", "EXPLAIN FORMAT indent SELECT 123"],
)]
#[case::change_format_version(
    "change_format_version",
    ["--file", "tests/sql/types_format.sql", "-q"],
)]
#[test]
fn cli_quick_test<'a>(
    #[case] snapshot_name: &'a str,
    #[case] args: impl IntoIterator<Item = &'a str>,
) {
    let mut settings = make_settings();
    settings.set_snapshot_suffix(snapshot_name);
    let _bound = settings.bind_to_scope();

    let mut cmd = cli();
    cmd.args(args);

    assert_cmd_snapshot!(cmd);
}

#[test]
fn cli_explain_environment_overrides() {
    let mut settings = make_settings();
    settings.set_snapshot_suffix("explain_plan_environment_overrides");
    let _bound = settings.bind_to_scope();

    let mut cmd = cli();

    // should use the environment variable to override the default explain plan
    cmd.env("DATAFUSION_EXPLAIN_FORMAT", "pgjson")
        .args(["--command", "EXPLAIN SELECT 123"]);

    assert_cmd_snapshot!(cmd);
}

#[rstest]
#[case("csv")]
#[case("tsv")]
#[case("table")]
#[case("json")]
#[case("nd-json")]
#[case("automatic")]
#[test]
fn test_cli_format<'a>(#[case] format: &'a str) {
    let mut settings = make_settings();
    settings.set_snapshot_suffix(format);
    let _bound = settings.bind_to_scope();

    let mut cmd = cli();
    cmd.args(["--command", "select 1", "-q", "--format", format]);

    assert_cmd_snapshot!(cmd);
}

#[rstest]
#[case("no_track", ["--top-memory-consumers", "0"])]
#[case("top2", ["--top-memory-consumers", "2"])]
#[case("top3_default", [])]
#[test]
fn test_cli_top_memory_consumers<'a>(
    #[case] snapshot_name: &str,
    #[case] top_memory_consumers: impl IntoIterator<Item = &'a str>,
) {
    let mut settings = make_settings();

    settings.set_snapshot_suffix(snapshot_name);

    settings.add_filter(
        r"[^\s]+\#\d+\(can spill: (true|false)\) consumed .*?B, peak .*?B",
        "Consumer(can spill: bool) consumed XB, peak XB",
    );
    settings.add_filter(
        r"Error: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
        "Error: Failed to allocate ",
    );
    settings.add_filter(
        r"Resources exhausted: Failed to allocate additional .*? for .*? with .*? already allocated for this reservation - .*? remain available for the total pool",
        "Resources exhausted: Failed to allocate",
    );

    let _bound = settings.bind_to_scope();

    let mut cmd = cli();
    let sql = "select * from generate_series(1,500000) as t1(v1) order by v1;";
    cmd.args(["--memory-limit", "10M", "--command", sql]);
    cmd.args(top_memory_consumers);

    assert_cmd_snapshot!(cmd);
}

#[tokio::test]
async fn test_cli() {
    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
        eprintln!("Skipping external storages integration tests");
        return;
    }

    let container = setup_minio_container().await;

    let settings = make_settings();
    let _bound = settings.bind_to_scope();

    let port = container.get_host_port_ipv4(9000).await.unwrap();

    glob!("sql/integration/*.sql", |path| {
        let input = fs::read_to_string(path).unwrap();
        assert_cmd_snapshot!(cli()
            .env_clear()
            .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
            .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
            .env("AWS_ENDPOINT", format!("http://localhost:{port}"))
            .env("AWS_ALLOW_HTTP", "true")
            .pass_stdin(input))
    });
}

#[tokio::test]
async fn test_aws_options() {
    // Separate test is needed to pass aws as options in sql and not via env

    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
        eprintln!("Skipping external storages integration tests");
        return;
    }

    let settings = make_settings();
    let _bound = settings.bind_to_scope();

    let container = setup_minio_container().await;
    let port = container.get_host_port_ipv4(9000).await.unwrap();

    let input = format!(
        r#"CREATE EXTERNAL TABLE CARS
STORED AS CSV
LOCATION 's3://data/cars.csv'
OPTIONS(
    'aws.access_key_id' 'TEST-DataFusionLogin',
    'aws.secret_access_key' 'TEST-DataFusionPassword',
    'aws.endpoint' 'http://localhost:{port}',
    'aws.allow_http' 'true'
);

SELECT * FROM CARS limit 1;
"#
    );

    assert_cmd_snapshot!(cli().env_clear().pass_stdin(input));
}

#[tokio::test]
async fn test_aws_region_auto_resolution() {
    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
        eprintln!("Skipping external storages integration tests");
        return;
    }

    let mut settings = make_settings();
    settings.add_filter(r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}Z", "[TIME]");
    let _bound = settings.bind_to_scope();

    let bucket = "s3://clickhouse-public-datasets/hits_compatible/athena_partitioned/hits_1.parquet";
    let region = "us-east-1";

    let input = format!(
        r#"CREATE EXTERNAL TABLE hits
STORED AS PARQUET
LOCATION '{bucket}'
OPTIONS(
    'aws.region' '{region}',
    'aws.skip_signature' true
);

SELECT COUNT(*) FROM hits;
"#
    );

    assert_cmd_snapshot!(cli()
        .env("RUST_LOG", "warn")
        .env_remove("AWS_ENDPOINT")
        .pass_stdin(input));
}

/// Ensure backtrace will be printed, if executing `datafusion-cli` with a query
/// that triggers error.
/// Example:
///     RUST_BACKTRACE=1 cargo run --features backtrace -- -c 'select pow(1,'foo');'
#[rstest]
#[case("SELECT pow(1,'foo')")]
#[case("SELECT CAST('not_a_number' AS INTEGER);")]
#[cfg(feature = "backtrace")]
fn test_backtrace_output(#[case] query: &str) {
    let mut cmd = cli();
    // Use a command that will cause an error and trigger backtrace
    cmd.args(["--command", query, "-q"])
        .env("RUST_BACKTRACE", "1"); // Enable backtrace

    let output = cmd.output().expect("Failed to execute command");
    let stdout = String::from_utf8_lossy(&output.stdout);
    let stderr = String::from_utf8_lossy(&output.stderr);
    let combined_output = format!("{}{}", stdout, stderr);

    // Assert that the output includes literal 'backtrace'
    assert!(
        combined_output.to_lowercase().contains("backtrace"),
        "Expected output to contain 'backtrace', but got stdout: '{}' stderr: '{}'",
        stdout,
        stderr
    );
}

#[tokio::test]
async fn test_s3_url_fallback() {
    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
        eprintln!("Skipping external storages integration tests");
        return;
    }

    let container = setup_minio_container().await;

    let mut settings = make_settings();
    settings.set_snapshot_suffix("s3_url_fallback");
    let _bound = settings.bind_to_scope();

    // Create a table using a prefix path (without trailing slash)
    // This should trigger the fallback logic where head() fails on the prefix
    // and list() is used to discover the actual files
    let input = r#"CREATE EXTERNAL TABLE partitioned_data
STORED AS CSV
LOCATION 's3://data/partitioned_csv'
OPTIONS (
    'format.has_header' 'false'
);

SELECT * FROM partitioned_data ORDER BY column_1, column_2 LIMIT 5;
"#;

    assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}

/// Validate object store profiling output
#[tokio::test]
async fn test_object_store_profiling() {
    if env::var("TEST_STORAGE_INTEGRATION").is_err() {
        eprintln!("Skipping external storages integration tests");
        return;
    }

    let container = setup_minio_container().await;

    let mut settings = make_settings();

    // as the object store profiling contains timestamps and durations, we must
    // filter them out to have stable snapshots
    //
    // Example line to filter:
    // 2025-10-11T12:02:59.722646+00:00 operation=Get duration=0.001495s size=1006 path=cars.csv
    // Output:
    // <TIMESTAMP> operation=Get duration=[DURATION] size=1006 path=cars.csv
    settings.add_filter(
        r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}(?:\.\d+)?[+-]\d{2}:\d{2} operation=(Get|Put|Delete|List|Head) duration=\d+\.\d{6}s (size=\d+\s+)?path=(.*)",
        "<TIMESTAMP> operation=$1 duration=[DURATION] ${2}path=$3",
    );

    // We also need to filter out the summary statistics (anything with an 's' at the end)
    // Example line(s) to filter:
    // | Get       | duration | 5.000000s | 5.000000s | 5.000000s |           | 1         |
    settings.add_filter(
        r"\| (Get|Put|Delete|List|Head)( +)\| duration \| .*? \| .*? \| .*? \| .*? \| (.*?) \|",
        "| $1$2 | duration | ...NORMALIZED...| $3 |",
    );

    let _bound = settings.bind_to_scope();

    let input = r#"
    CREATE EXTERNAL TABLE CARS
STORED AS CSV
LOCATION 's3://data/cars.csv';

-- Initial query should not show any profiling as the object store is not instrumented yet
SELECT * from CARS LIMIT 1;
\object_store_profiling trace
-- Query again to see the full profiling output
SELECT * from CARS LIMIT 1;
\object_store_profiling summary
-- Query again to see the summarized profiling output
SELECT * from CARS LIMIT 1;
\object_store_profiling disabled
-- Final query should not show any profiling as we disabled it again
SELECT * from CARS LIMIT 1;
"#;

    assert_cmd_snapshot!(cli().with_minio(&container).await.pass_stdin(input));
}

/// Extension trait to Add the minio connection information to a Command
#[async_trait]
trait MinioCommandExt {
    async fn with_minio(&mut self, container: &ContainerAsync<minio::MinIO>)
        -> &mut Self;
}

#[async_trait]
impl MinioCommandExt for Command {
    async fn with_minio(
        &mut self,
        container: &ContainerAsync<minio::MinIO>,
    ) -> &mut Self {
        let port = container.get_host_port_ipv4(9000).await.unwrap();

        self.env_clear()
            .env("AWS_ACCESS_KEY_ID", "TEST-DataFusionLogin")
            .env("AWS_SECRET_ACCESS_KEY", "TEST-DataFusionPassword")
            .env("AWS_ENDPOINT", format!("http://localhost:{port}"))
            .env("AWS_ALLOW_HTTP", "true")
    }
}
